這篇我們總於要來看一下怎麼實際用 Effect 來實作一些功能了,第一篇是資料清理,我們要來看怎麼使用 Effect 協助我們清理資料,並安全的處理各種錯誤的情況
所有的實戰分享都是我有實際使用過 Effect 的案例,雖然大部份的案例沒辦法讓各位看到原本的實際的程式碼,不過我會盡可能的讓案例符合現實的情況
今天你拿到了一個老舊的系統,這個老舊系統大部份的資料並沒有使用資料庫,前人將資料使用一堆 json 檔存了起來,今天你的目標是將這些資料想辦法存到資料庫中,但存進去前,你碰到了幾個問題
以上的情況經過一點改編,我知道這個系統設計聽起來很不可思議
好消息是前人還是留下了一些東西
在進行這項工程時,我們訂下了幾個準則:
關於第 1 點,因為牽扯到之後要講的資料牽移,我們等到「15. Effect 實戰分享 3: 資料遷移」才會再來聊到,這次一起來看第 2 點我們是怎麼實作的吧
在一開始,我們需要先知道哪些資料是正常的,因此做為第一步,我們把現有的 TypeScript 的 type 都轉換成了 zod
的 schema
雖然 Effect 也有 schema 的實作,不過大部份情況下,我還是偏好 zod 的實作,之後也可以聊聊我什麼時候會用 zod ,什麼時候會用 Effect 的
例如原本的 type 長的像這樣
interface Item {
id: string
field1: string
field2: string
// 有些欄位是後來加入的,它有預設值,但儲存資料裡面可能不一定有
newButRequiredField: string
// 有些欄位早就已經沒有用到了
obsoleteField: string
}
那我們把這個 type 轉換成 zod 的 schema 吧
const ItemSchema = z.object({
id: z.string(),
field1: z.string(),
field2: z.string(),
newButRequiredField: z.string(),
obsoleteField: z.string(),
})
接著再用 Effect 將整個 parse 的過程包起來
import { Array, Effect, Either } from 'effect'
function parseData(input: unknown) {
return Effect.try({
try: () => ItemSchema.parse(input),
catch: (error) =>
// 這邊我們可以用上一篇建立的 ValidationError
new ValidationError({
message: 'parse error',
data: input,
cause: error,
}),
})
}
const [errors, validData] = pipe(
// 載入的是一個陣列資料
loadData(),
Effect.flatMap((data) =>
pipe(
data.map((item) => parseData(item)),
// 還記得之前提過的 either mode 嗎?
Effect.allWith({ mode: 'either' }),
),
),
Effect.map((parsed) =>
pipe(
parsed,
// 這邊使用了 Effect 提供的 Array 的 helper ,把資料分成成功 parse 的跟失敗的
Array.partitionMap((result, index) =>
// 將 Either 的 Left ,也就是錯誤的資料加上 `index` 方便我們之後去找這這些資料
Either.mapLeft(result, (error) => ({ error, index })),
),
),
),
Effect.runSync,
)
到這邊你可能會覺得:「那這跟我自己寫個迴圈處理有什麼差別?」,確實目前還可以寫成
const errors = []
const validData = []
const raw = loadData()
for (const [index, item] of raw.entries()) {
const parsed = ItemSchema.safeParse(item)
if (parsed.success) {
validData.push(parsed.data)
} else {
errors.push({
index,
error: new ValidationError({
message: 'parse error',
data: item,
cause: parsed.error
})
})
}
}
下面的版本可讀性感覺還比較好,但這還沒結束,讓我們先繼續下去。我們可以先把錯誤的部份印出來,使用 zod.fyi 可以讓錯誤訊息更好閱讀,那時我還自己寫了個小工具讓我不用一個一個的複製貼上
收集到各種的問題後,我們開始著手處理異常的資料,異常的資料大致有分成
{ error: 'Internal server error' }
到這邊,基本上就是不斷的調整 schema ,盡可能的拯救資料,例如為欄位加上預設值,直接放棄非必要的欄位
const ItemSchema = z.object({
id: z.string(),
field1: z.string(),
field2: z.string(),
// 加上預設值
newButRequiredField: z.string().default('default value'),
// 對照 code 後移除非必要的欄位
})
這個系統在演進的過程中,在某次更新加入了資料庫,造成在上面的資料中,有部份已經被存一份到資料庫中了,有在資料庫中的資料,在紀錄時會多一個資料庫中的 id 欄位叫 serverId
,同樣也是 uuid
const ItemSchema = z.object({
id: z.string(),
serverId: z.string().optional(),
field1: z.string(),
field2: z.string(),
newButRequiredField: z.string().default('default value'),
})
我們的目前是將資料轉移到資料庫中,對於已經在資料庫中有的資料就不用繼續處理了,於是我們這邊多了一個檢查是否已經存在於資料庫中的非同步的 function checkAvailableInDb
,接下來我們重新設計一下流程
重新設計後的流程長的像這樣
const [errors, validData] = await pipe(
loadData(),
Effect.flatMap((data) =>
pipe(
data.map((item) => parseData(item)),
Effect.allWith({ mode: 'either' }),
),
),
Effect.map((parsed) =>
pipe(
parsed,
Array.partitionMap((result, index) =>
Either.mapLeft(result, (error) => ({ error, index })),
),
),
),
Effect.flatMap(([errors, validData]) =>
Effect.all([
Effect.succeed(errors),
Effect.filter(
validData,
(item) =>
// 檢查是否在 db 中
item.serverId
? pipe(
checkAvailableInDb(item.serverId),
Effect.map((available) => !available),
)
: Effect.succeed(false),
// 控制最多同時執行 3 個
{ concurrency: 3 },
),
]),
),
// 中間多了 async 的動作,換成使用 runPromise
Effect.runPromise,
)
你可能會想到為什麼檢查是否存在沒辦法一次進行大量查詢,這牽涉到這個老舊系統當初的設計,如果可以一次處理,整體流程可以變的更簡單
我們稍微整理一下整個流程,在這邊你會發現, FP 的一個好處是每個操作都是各別的函式,要拆開來只需要把相關的 code 剪下貼上到一個 function 中,就可以很好的分解了,我們直接看以下的範例
interface ParseErrorResult {
index: number
error: ValidationError
}
type ParseResult = [ParseErrorResult[], Item[]]
function parseDatum(data: unknown[]): Effect.Effect<ParseResult> {
return pipe(
data.map((item) => parseData(item)),
Effect.allWith({ mode: 'either' }),
Effect.map((parsed) =>
Array.partitionMap(parsed, (result, index) =>
Either.mapLeft(result, (error) => ({ error, index })),
),
),
)
}
function checkParseResultInDb([errors, validData]: ParseResult): Effect.Effect<ParseResult> {
return Effect.all([
Effect.succeed(errors),
Effect.filter(
validData,
(item) =>
item.serverId
? pipe(
checkAvailableInDb(item.serverId),
Effect.map((available) => !available),
)
: Effect.succeed(false),
{ concurrency: 3 },
),
])
}
const [errors, validData] = await pipe(
loadData(),
Effect.flatMap(parseDatum),
Effect.flatMap(checkParseResultInDb),
Effect.runPromise,
)
這樣是否好讀多了呢,我們來看一下如果用 promise 會變的怎麼樣
function parseWithoutEffect(): ParseResult {
const errors = []
const validData = []
const raw = loadData()
for (const [index, item] of raw.entries()) {
const parsed = ItemSchema.safeParse(item)
if (parsed.success) {
validData.push(parsed.data)
} else {
errors.push({
index,
error: new ValidationError({
message: 'parse error',
data: item,
cause: parsed.error,
}),
})
}
}
return [errors, validData]
}
function checkAvailableWithPromise(validData: Item[]): Promise<Item[]> {
// 跟之前一樣,使用 p-map 幫忙處理 concurrency 限制
return pMap(validData, async (item) => {
if (!item.serverId) {
return pMapSkip
}
if (!await checkAvailableInDb(item.serverId)) {
return pMapSkip
}
return item
}, {concurrency: 3})
}
const [errors, validDataWithoutAvailableCheck] = parseWithoutEffect()
const validData = await checkAvailableWithPromise(validDataWithoutAvailableCheck)
目前以我自己看下來, Effect 的版本可讀性看起來跟直接寫的版本差不多了,但 Effect 的版本還是有些好處存在,例如
下一篇要來介紹 Effect 最強大的功能,排程與錯誤重試